package com.hao.chapter11;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class TopNExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //1.在创建表的DDL中直接定义时间属性
        String creatDDL = "CREATE TABLE clickTable (" +
                "user_name STRING," +
                "url STRING," +
                "ts BIGINT," +
                "et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000))," + //事件时间  FROM_UNIXTIME() 能转换为年月日时分秒这样的格式 转换秒
                " WATERMARK FOR et AS et - INTERVAL '1' SECOND " + //watermark 延迟一秒
                ")WITH(" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'input/clicks.txt'," +
                " 'format' = 'csv'" +
                ")";

        tableEnv.executeSql(creatDDL);

        //普通TopN 选取当前所有用户中浏览量最大的两个
        Table topNResult = tableEnv.sqlQuery("SELECT user_name,cnt,row_num " +
                "FROM (" +
                "  SELECT *,ROW_NUMBER() OVER( " +
                "      ORDER BY cnt DESC " +
                "    ) AS row_num " +
                "  FROM (SELECT user_name,count(url) AS cnt FROM clickTable GROUP BY user_name)" +
                ") WHERE row_num <= 2");


        //窗口 TopN , 统计10秒内的( 前2名 ) 活跃用户
        String subQuery = "SELECT user_name,COUNT(url) AS cnt,window_start,window_end " +
                "FROM TABLE(" +
                " TUMBLE(TABLE clickTable,DESCRIPTOR(et),INTERVAL '10' SECOND)" +
                ")" +
                "GROUP BY user_name,window_start,window_end";

        Table windowTopNResult = tableEnv.sqlQuery("SELECT user_name,cnt,row_num,window_end " +
                "FROM (" +
                "  SELECT *,ROW_NUMBER() OVER (" +
                "     PARTITION BY window_start,window_end " +
                "     ORDER BY cnt DESC" +
                "    ) AS row_num " +
                "  FROM (" + subQuery + ")" +
                ") WHERE row_num <= 2");


        tableEnv.toChangelogStream(topNResult).print("topNResult 2 : ");
        tableEnv.toChangelogStream(windowTopNResult).print("windowTopNResult 2 : ");

        env.execute();
    }
}
